Spark Window Aggregate Functions

To calculate sum, min, max for each department using Spark SQL Aggregate window functions and WindowSpec. When working with Aggregate functions, we don’t need to use order by clause. 

Package Need to imported
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

Create DataFrame
val empDF = spark.createDataFrame(Seq(
      (7369, "SMITH", "CLERK", 7902, "17-Dec-80", 800, 20, 10),
      (7499, "ALLEN", "SALESMAN", 7698, "20-Feb-81", 1600, 300, 30),
      (7521, "WARD", "SALESMAN", 7698, "22-Feb-81", 1250, 500, 30),
      (7566, "JONES", "MANAGER", 7839, "2-Apr-81", 2975, 0, 20),
      (7654, "MARTIN", "SALESMAN", 7698, "28-Sep-81", 1250, 1400, 30),
      (7698, "BLAKE", "MANAGER", 7839, "1-May-81", 2850, 0, 30),
      (7782, "CLARK", "MANAGER", 7839, "9-Jun-81", 2450, 0, 10),
      (7788, "SCOTT", "ANALYST", 7566, "19-Apr-87", 3000, 0, 20),
      (7839, "KING", "PRESIDENT", 0, "17-Nov-81", 5000, 0, 10),
      (7844, "TURNER", "SALESMAN", 7698, "8-Sep-81", 1500, 0, 30),
      (7876, "ADAMS", "CLERK", 7788, "23-May-87", 1100, 0, 20)
    )).toDF("empno", "ename", "job", "mgr", "hiredate", "sal", "comm", "deptno")
 
Window Aggregate Functions
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
empDF.withColumn("total_sal",sum("sal") over(Window.partitionBy($"deptno"))).select($"deptno",$"total_sal").distinct().show

 
Find max salary for each depatrment
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
empDF.withColumn("max_sal",max("sal") over(Window.partitionBy($"deptno"))).select($"deptno",$"max_sal").distinct().show

Window Aggregate Functions
import org.apache.spark.sql.expressions.Window
val partitionWindow = Window.partitionBy($"job")
empDF.withColumn("total_sal", sum('sal) over partitionWindow)
     .withColumn("max_sal", max('sal) over partitionWindow)
     .withColumn("min_sal", min('sal) over partitionWindow)
     .withColumn("avg_sal", avg('sal) over partitionWindow)
     .withColumn("row_no",row_number.over(partitionWindow orderBy($"sal".desc)))
     .where(col("row_no")===1)
     .select("job","Total_sal","max_sal","min_sal","avg_sal").show

 

Find Third highest salary for each department

val partitionWindow = Window.partitionBy($"deptno")
empDF.withColumn("row_no",row_number.over(partitionWindow orderBy($"sal".desc))) .where(col("row_no")===3).select("*").show


No comments:

Post a Comment